package cn.doitedu.rtmk.demo.demo1;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/9
 * @Desc: 学大数据，到多易教育
 * <p>
 * 事件驱动型的基于规则的自动化实时营销系统DEMO
 * {"userId":1,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":1,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":1,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":1,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 * {"userId":2,"eventId":"event1","properties":{},"timeStamp":1675935310000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935311000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935312000}
 * {"userId":2,"eventId":"item_like","properties":{"item_price":150},"timeStamp":1675935313000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935314000}
 * {"userId":2,"eventId":"item_share","properties":{"item_price":150},"timeStamp":1675935315000}
 * {"userId":2,"eventId":"add_cart","properties":{"item_price":150},"timeStamp":1675935316000}
 *
 **/
public class Demo1 {
    public static void main(String[] args) throws Exception {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");


        // 读 kafka中的日志明细数据,接收用户的行为
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setGroupId("gpac01" + System.currentTimeMillis())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setTopics("rtmk-test-data")
                .build();
        DataStreamSource<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "s");
        SingleOutputStreamOperator<EventBean> beans = ds.map(json -> JSON.parseObject(json, EventBean.class));


        SingleOutputStreamOperator<RtmkMessage> resultStream = beans.keyBy(EventBean::getUserId)
                .process(new KeyedProcessFunction<Integer, EventBean, RtmkMessage>() {
                    Table table;
                    final byte[] FAMILY = "f" .getBytes();
                    final byte[] AGE = Bytes.toBytes("age");
                    final byte[] GENDER = Bytes.toBytes("gender");
                    final byte[] TG03 = Bytes.toBytes("tg03");

                    ListState<EventBean> con1State;
                    ListState<EventBean> con2State;

                    @Override
                    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                        Configuration config = HBaseConfiguration.create();
                        config.set("hbase.zookeeper.quorum", "doitedu:2181");
                        Connection conn = ConnectionFactory.createConnection(config);
                        table = conn.getTable(TableName.valueOf("user_profile"));


                        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(5)).useProcessingTime().build();
                        ListStateDescriptor<EventBean> con1StateDesc =
                                new ListStateDescriptor<>("con1_state", EventBean.class);
                        con1StateDesc.enableTimeToLive(ttlConfig);
                        con1State = getRuntimeContext().getListState(con1StateDesc);


                        StateTtlConfig ttlConfig2 = StateTtlConfig.newBuilder(Time.minutes(30)).useProcessingTime().build();
                        ListStateDescriptor<EventBean> con2StateDesc =
                                new ListStateDescriptor<>("con2_state", EventBean.class);
                        con1StateDesc.enableTimeToLive(ttlConfig2);
                        con2State = getRuntimeContext().getListState(con2StateDesc);

                    }

                    @Override
                    public void processElement(EventBean eventBean, KeyedProcessFunction<Integer, EventBean, RtmkMessage>.Context ctx, Collector<RtmkMessage> out) throws Exception {

                        // 进行规则所需要的实时画像标签的计算
                        if (eventBean.getEventId().equals("item_share")) con1State.add(eventBean);
                        if (eventBean.getEventId().equals("item_like")) con2State.add(eventBean);

                        // 判断是否是预定义的营销规则的触发事件 ：加购事件（加购的商品价格>100)
                        if (eventBean.getEventId().equals("add_cart") && Double.parseDouble(eventBean.getProperties().get("item_price")) > 100) {
                            System.out.println(eventBean.getUserId() + " 触发了规则");

                            // 判断该事件的行为人是否满足营销规则中定义的判断条件
                            // 离线静态画像条件：从离线画像库中可以查询到的条件
                            //  age : (20,40]
                            //  gender: male
                            //  半年内月平均消费额(tg03): > 1000
                            Get get = new Get(Bytes.toBytes(eventBean.getUserId()));
                            Result result = table.get(get);

                            int age = Bytes.toInt(result.getValue(FAMILY, AGE));
                            String gender = Bytes.toString(result.getValue(FAMILY, GENDER));
                            int tg03 = Bytes.toInt(result.getValue(FAMILY, TG03));

                            if (age > 20 && age <= 40 && gender.equals("male") && tg03 > 1000) {
                                //  实时动态画像条件：
                                //  最近 5分钟发生过 ： 分享行为 2次以上
                                //  最近 30分钟发生过 ：点赞行为 1次以上
                                int con1Count = 0;
                                for (EventBean bean : con1State.get()) {
                                    con1Count++;
                                }
                                if (con1Count > 2) {
                                    int con2Count = 0;
                                    for (EventBean bean : con2State.get()) {
                                        con2Count++;
                                    }
                                    if (con2Count > 0) {
                                        // 走到这里，就所有的条件（包含离线静态画像标签条件和 实时动态画像标签条件）都满足了
                                        out.collect(new RtmkMessage(eventBean.getUserId(), eventBean.getTimeStamp(), "rule-001"));
                                    }
                                }
                            }
                        }
                    }
                });

        resultStream.print();

        env.execute();


    }
}
